同步容器类包括Vector和Hashtable以及Collections.synchronizedXxx等工厂方法创建的,这些类实现线程安全的方式是:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。
同步容器类通过其自身的锁来保护它的每一个方法。
看下面这种情况:
123456789public static Object getlast(Vector list) {int lastIndex = list.size() - 1;return list.get(lastIndex);}public static void deleteLast(Vector list) {int lastIndex = list.size() - 1;list.remove(lastIndex);}虽然Vector.get()和Vectot.remove()方法都是同步方法,但是在getLast()方法和deleteLast()方法中都先经历了list.size()方法,当getLast()方法执行完
int lastIndex = list.size() - 1;
后,如果此时有另外一个线程执行deleteLast()方法,那么lastIndex的值就是无效的,因此,这两个方法不是同步的。我们可以这样解决:
12345678910111213public static Object getlast(Vector list) {synchronized(list) {int lastIndex = list.size() - 1;return list.get(lastIndex);}}public static void deleteLast(Vector list) {synchronized(list) {int lastIndex = list.size() - 1;list.remove(lastIndex);}}对容器类进行迭代的标准方式是使用Iterator,然而,如果有其他线程并发的修改容器,那么即使是使用迭代器也无法避免在迭代期间对容器加锁。当容器类在迭代过程中被修改时,会抛出ConcurrentModificationException异常。
如果不希望在迭代期间对容器加锁,那么一种替代方法就是克隆容器,并在副本上进行迭代,由于副本封闭在线程内,因此其他线程不会在迭代期间对其进行修改,这样就可以避免出现ConcurrentModificationException异常。
同步容器将所有对容器状态的访问都串行化,以实现它们的线程安全性,这种方法的代价是严重降低并发性,Java5.0提供了多种并发容器类来改进同步容器的性能。
并发容器是针对多个线程并发访问设计的,在Java5.0中增加了ConcurrentHashMap用来替代同步且基于散列的Map以及CopyOnWriteArrayList,用于在遍历操作为主要操作的情况下代替同步的List。
同步容器类在执行每个操作期间都持有一个锁,与HashMap一样,ConcurrentHashMap也是一个基于散列的Map,但它使用了一种完全不同的加锁策略来提供更高的并发性和伸缩性,ConcurrentHashMap并不是将每个方法都在同一个锁上同步并使得每次只能有一个线程访问容器,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制成为分段锁。
在这种机制中,任意数量的读取线程可以并发的访问Map,执行读取操作的线程和执行写入操作的线程可以并发的访问Map,并且一定数量的写入线程可以并发的修改Map。
ConcurrentHashMap提供的迭代器不会抛出ConcurrentModificationException,因此不需要在迭代过程中对容器进行加锁
ConcurrentHashMap不能被加锁来执行独占访问,因此我们无法使用客户端加锁来创建新的原子操作,在ConcurrentHashMap中一些常见的复合操作例如若没有则添加、若相等则移除和若相等则替换都已经实现为原子操作并且在ConcurrentHashMap接口中声明。
123456public interface ConcurrentHashMap<K, V> extends Map<K, V> {V putIfAbsent(K key, V value);boolean remove(K key, V value);boolean replace (K key, V oldValue, V newValue);V replace(K key, V newValue);}上面的这些方法在ConcurrentHashMap中已经实现为同步方法。
CopyOnWriteArrayList用于替代同步List,在某些情况下它提供了更好的并发性能,并且在迭代期间不需要对容器进行加锁或复制。
在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。“写入时复制”容器的迭代器保留一个指向底层基础数组的引用,这个数组当前位于迭代器的起始位置,由于它不会被修改,因此在对其进行同步时只需确保数组内容的可见性。
因此,多个线程可以同时对这个容器进行迭代,而不会彼此干扰或者与修改容器的线程相互干扰,“写入时复制”容器返回的迭代器不会抛出ConcurrentModificationException,并且返回的元素与迭代器创建时的元素完全一致,而不必考虑之后修改操作所带来的影响。
阻塞队列提供了可阻塞的put和take方法,以及支持定时的offer和poll方法。BlockingQueue简化了生产者-消费者设计的实现过程,它支持任意数量的生产者和消费者。一种最常见的生产者-消费者设计模式就是线程池与工作队列的组合,在Executor任务执行框架中就体现了这种模式。
阻塞队列提供的offer方法,如果数据项不能被添加到队列中,那么将返回一个失败状态,这样你就能够创建更多灵活的策略来处理负荷过载的情况,例如减轻负载,将多余的工作项序列并写入硬盘,减少生产者线程的数量。
在类库中包含了BlockingQueue的多种实现,其中,LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,二者区别与LinkedList和ArrayList类似,但比同步List拥有更好的并发性能。PriorotyBlockingQueue是一个按照优先级排序的队列,当你希望按照某种顺序而不是FIFO来处理元素时,这个队列将十分有用。
CountDownLatch是一种灵活的闭锁实现,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量,countDown方法递减计数器,表示有一个事件发生了,而await方法等待计数器达到0,这表示所有需要等待的事件都已经发生,如果计数器的值非零,那么await会一直阻塞直到计数器为0,或者等待中的线程中断,或者等待超时。
看下面这个例子,创建一定数量的线程,利用他们并发的执行指定的认为。在这里例子中使用了两个闭锁,分别表示起始门和结束门,起始门计数器的初始值为1,而结束门计数器的初始值为工作线程的数量:
1234567891011121314151617181920212223242526272829public class TestHarness {public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {final CountDownLatch startGate = new CountDownLatch(1);final CountDownLatch endGate = new CountDownLatch(nThreads);for (int i = 0; i < nThreads; i++) {Thread t = new Thread() {public void run() {try {startGate.await();try {task.run();} finally {endGate.countDown();}} catch(InterruptedException ignored) {}}};t.start();}long start = System.nanoTime();startGate.countDown();endGate.await();long end = System.nanoTime();return end - start;}}这里启动门将使得主线程能够同时释放所有工作线程,而结束门则使主线程能够等到最后一个线程执行完成。
FutureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,Future.get的行为取决于任务的状态,如果任务已经完成,那么get会立即返回结果,否则get将阻塞直到任务进入完成状态,然后返回结果或者抛出异常,FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程。
FutureTask在Executor框架中表示异步任务,此外还可以用来表示一些时间较长的计算,这些计算可以在使用计算结果之前启动:
123456789101112131415161718192021222324252627public class Preloader {private final FutureTask<ProductInfo> future =new FutureTask<ProductInfo>(new Callable<ProductInfo>() {public ProductInfo call() throws DataLoadException {return loadProductInfo();}});private final Thread thread = new Thread(future);public void start() {thread.start();}public ProductInfo get() throws DataLoadException, InterruptedException {try {return future.get();} catch (ExecutionException e) {Throwable cause = e.getCause();if (cause instanceof DataLoadException) {throw (DataLoadException) cause;} else {throw launderThrowable(cause); // throw 语句可以调用函数}}}}在上面的程序中,当程序随后需要ProductInfo时,可以调用get方法,如果数据已经加载,那么将返回这些数据,否则将等待加载完成后再返回。
计数信号量(Semaphore)用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量,计数信号量还可以用来实现某种资源池。
Semaphore中管理着一组虚拟的许可,许可的初始数量可通过构造函数指定,在执行操作时首先获得许可,并在使用以后释放许可。如果没有许可,那么acquire将阻塞到有许可,release方法将放回一个许可信号量。
Semaphore可以用于实现资源池,例如数据库连接池,我们可以构造一个固定长度的资源池,当池为空时,请求资源就会失败,但你真正想看到的行为是阻塞而不是失败,并且当非空时解除阻塞。如果将Semaphore
的计数值初始化为池的大小,并在从池中获取一个资源之前首先调用acquire方法获取一个许可,在将资源返回给池后调用release释放许可,那么acquire将一直阻塞知道资源池不为空。
可以使用Semaphore将任何一种容器变成有界阻塞容器,如下所示:
123456789101112131415161718192021222324252627282930public class BoundedHashSet<T> {private final Set<T> set;private final Semaphore sem;public BoundedHashSet(int bound) {this.set = Collections.synchronizedSet(new HashSet<T>());sem = new Semaphore(bound);}public boolean add(T o) throws InterruptedException {sem.acquire();boolean wasAdded = false;try {wasAdded = set.add(o);return wasAdded;} finally {if (!wasAdded) {sem.release();}}}public boolean remove(Object o) {boolean wasRemoved = set.remove(o);if (wasRemoved) {sem.release();}return wasRemoved;}}在上面的代码中,信号量的初始值会被初始化为容器的最大值,add操作在向底层容器中添加一个元素之前,会首先获取一个许可,如果add操作没有成功,那么会释放许可,否则就会一直占用着许可,也即少了一个许可,对应到容器中就是容器中已经有了一个元素,容器的空间减一。
闭锁是一次性对象,一旦进入终止状态,就不能被重置。
Barrier类似于闭锁,它能阻塞一组线程直到某个事件发生,栅栏与闭锁的关键区别在于所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。
CyclicBarrier可以使一定数量的参与方反复的在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法将一个问题拆分为一系列相互独立的子问题,当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置以便下次使用。
如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException,如果成功的通过栅栏,那么await将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来选举产生一个领导线程,并在下次迭代中由该领导线程执行一些特殊的工作。
CyclicBarrier还可以使你将一个栅栏操作传递给构造函数,这是一个Runnable,当成功通过栅栏时会在一个子任务线程中执行它,但在阻塞线程被释放前不会被执行。
构造函数CyclicBarrier(int parties, Runnable barrierAction),当线程在CyclicBarrier对象上调用await()方法时,栅栏的计数器将增加1,当计数器为parties时,栅栏将打开。